use crate::conf_init::MapResult;
use crate::source::{Source, SourceResult, SourceFormat};
use zeromq::{PullSocket, Socket, SocketRecv, ZmqResult};
use crate::data_frame::json::JsonValue;
use crate::indicators::{indicators_key_ref, IndicatorsMode, Op};
use std::fmt::Debug;
use tokio::runtime::Runtime;
use std::collections::VecDeque;
use std::time::Duration;
use crate::data_frame::single_data::SingleData;

#[derive(Deserialize,Debug)]
pub enum ZmqConf{
    Pull(ZmqPullConf),
}

#[derive(Deserialize,Debug,Clone)]
pub struct ZmqPullConf{
    pub endpoint: Vec<String>,
}

#[derive(Debug)]
pub enum Zmq{
    Pull(ZmqPull),
}

pub struct ZmqPull{
    name: String,
    format: SourceFormat,
    conf: ZmqPullConf,
    pull_socket: PullSocket,
}
impl Debug for ZmqPull{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ZmqPull")
            .field("name", &self.name)
            .field("format", &self.format)
            .field("conf", &self.conf)
            .finish()
    }
}
impl Zmq{
    pub fn new(name: &str, format: &SourceFormat, conf: &ZmqConf)->MapResult<Box<dyn Source>>{

        match conf{
            ZmqConf::Pull(pull_conf) => {

                let pull_socket = PullSocket::new();
                let zmq_pull = ZmqPull{
                    name: name.to_string(),
                    format: format.clone(),
                    conf: pull_conf.clone(),
                    pull_socket,
                };
                MapResult::Ok(Box::new(Zmq::Pull(zmq_pull)))
            },
        }
    }
    pub async fn recv(name: String, data: Vec<Vec<u8>>)->Result<Vec<SingleData>,()>{

        let mut data_list = Vec::new();

        for d in data.into_iter(){
            let msg_string = match String::from_utf8(d) {
                Ok(t) => { t },
                Err(e) => {
                    indicators_key_ref(Op::Add, IndicatorsMode::Source, name.as_str(), "trans err", 1);
                    continue;
                },
            };

            let json_value = match JsonValue::build_new(msg_string) {
                Ok(t) => { t },
                Err(e) => {
                    indicators_key_ref(Op::Add, IndicatorsMode::Source, name.as_str(), "json err", 1);
                    continue;
                },
            };
            let data = SingleData::new(json_value);
            data_list.push(data);
        }

        return Ok(data_list);
    }
}

#[async_trait]
impl Source for Zmq{
    async fn init(&mut self)->bool{
        match self{
            Zmq::Pull(zmq_pull) => {
                for endpoint in zmq_pull.conf.endpoint.iter(){
                    match zmq_pull.pull_socket.bind(endpoint).await{
                        Ok(_) => {},
                        Err(e) => {
                            warn!("zmq pull bind err {:?}", e);
                            return false;
                        },
                    }
                }
                return true;
            },
        }
    }
    async fn recv(&mut self, runtime: &Runtime, timeout: Duration, burst: usize, concurrent: usize)->SourceResult{

        let sleep = tokio::time::sleep(timeout);
        tokio::pin!(sleep);
        let mut data = Vec::new();
        match self{
            Zmq::Pull(zmq_pull) => {

                let mut join_set = VecDeque::new();
                'back:
                loop {
                    tokio::select! {
                        msg = zmq_pull.pull_socket.recv() => {

                            match msg {
                                ZmqResult::Ok(msg) => {
                                    for m in msg.iter() {
                                        data.push(m.to_vec());
                                    }
                                },
                                ZmqResult::Err(e) => {
                                    return SourceResult::Err(());
                                },
                            }

                            if data.len() >= burst{
                                let tmp_name = zmq_pull.name.clone();
                                let t = runtime.spawn(
                                    Zmq::recv(tmp_name, data)
                                );
                                data = Vec::new();
                                join_set.push_back(t);
                            }

                            if join_set.len() >= concurrent{
                                break 'back;
                            }
                        }
                        _ = &mut sleep => {

                            let tmp_name = zmq_pull.name.clone();
                            let t = runtime.spawn(
                                Zmq::recv(tmp_name, data)
                            );
                            join_set.push_back(t);

                            let mut tmp = Vec::new();

                            while let Some(t) = join_set.pop_front(){
                                if let Ok(t) = t.await{
                                    if let Ok(t) = t{
                                        tmp.extend_from_slice(t.as_slice());
                                    }else{
                                        return SourceResult::Err(());
                                    }
                                }else{
                                    return SourceResult::Err(());
                                }
                            }
                            
                            return SourceResult::Ok(tmp);
                        }
                    }
                }

                let tmp_name = zmq_pull.name.clone();
                let t = runtime.spawn(
                    Zmq::recv(tmp_name, data)
                );
                join_set.push_back(t);

                let mut tmp = Vec::new();

                while let Some(t) = join_set.pop_front(){
                    if let Ok(t) = t.await{
                        if let Ok(t) = t{
                            tmp.extend_from_slice(t.as_slice());
                        }else{
                            return SourceResult::Err(());
                        }
                    }else{
                        return SourceResult::Err(());
                    }
                }

                return SourceResult::Ok(tmp);
            },
        }
    }
}